package org.codehaus.activemq.service.impl;

import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.store.PersistenceAdapter;

public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager
{
  private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);

  public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
    this(persistenceAdapter, new SubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
  }

  public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
    super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
  }

  public void addMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    if (info.getDestination().isTopic())
      doAddMessageConsumer(client, info);
  }

  public void removeMessageConsumer(BrokerClient client, ConsumerInfo info)
    throws JMSException
  {
    Subscription sub = (Subscription)this.activeSubscriptions.remove(info.getConsumerId());
    if (sub != null) {
      sub.setActive(false);
      this.dispatcher.removeActiveSubscription(client, sub);
      this.subscriptionContainer.removeSubscription(info.getConsumerId());
      sub.clear();
    }
  }

  public void sendMessage(BrokerClient client, ActiveMQMessage message)
    throws JMSException
  {
    ActiveMQDestination dest = (ActiveMQDestination)message.getJMSDestination();
    MessageContainer container;
    Iterator i;
    if ((dest != null) && (dest.isTopic())) {
      container = null;
      if (log.isDebugEnabled()) {
        log.debug("Dispaching to " + this.subscriptionContainer + " subscriptions with message: " + message);
      }
      for (i = this.subscriptionContainer.subscriptionIterator(); i.hasNext(); ) {
        Subscription sub = (Subscription)i.next();
        if ((sub.isTarget(message)) && ((!sub.isDurableTopic()) || (message.getJMSDeliveryMode() == 1))) {
          if (container == null) {
            container = getContainer(message.getJMSDestination().toString());
            container.addMessage(message);
          }
          sub.addMessage(container, message);
        }
      }
    }
  }

  public void deleteSubscription(String clientId, String subscriberName)
    throws JMSException
  {
  }
}